
#include "config.h"

#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"
#include "lsv_help.h"
#include "lsv_volume.h"
#include "lsv_volume_gc.h"
#include "lsv_log.h"
#include "volume_proto.h"
#include "lsv_chunk_pool.h"

#define print_vol_gc_info(vachk_list, type) \
        DINFO("<%u,%u>,<%u,%u>,%u,%u\n", (vachk_list)->malloc_record,\
                        ((vachk_list)->record + (vachk_list)->malloc_record)->count,\
                        (vachk_list)->free_record,\
                        ((vachk_list)->record + (vachk_list)->free_record)->count,\
                        (vachk_list)->next_chkid, (type));


#define PUSH_ROLLBACK                   (0)
#define POP_ROLLBACK                    (1)

#define LSV_CHUNK_ALLOCATE_STEP         (64)
#define MIN_CHUNK_MAP_SIZE              (1024)
#define MAX_CHUNK_MAP_SIZE              ((LSV_CHUNK_SIZE - sizeof(chunk_pool_t)) / sizeof(lsv_u32_t))

//espicially for metadata.
#define MIN_ZERO_CHUNK_MAP_SIZE         (64)
#define MAX_ZERO_CHUNK_MAP_SIZE         1024

#define LSV_CHUNK_POOL_RESERVED_CHUNKS  2
#define LSV_CHUNK_DEFAULT_POOL_ID       1
#define LSV_CHUNK_ZERO_POOL_ID          2

#define LSV_CHUNK_POOL_UPDATE_ALL       0
#define LSV_CHUNK_POOL_UPDATE_META      1

#define SKIP_DISCARD                    1
//persistent data.
typedef struct chunk_pool {
        lsv_u32_t               self_chkid;
        lsv_u32_t               next_chkid;
        lsv_u32_t               alloc_chkid;
        lsv_u32_t               pos;
        lsv_u32_t               len;
        lsv_u32_t               array[0];
} chunk_pool_t;


typedef struct discard_item {
        struct list_head        hook;

        chunk_pool_t            *unit;
} discard_item_t;


typedef struct map_queue {
        uint32_t                min;
        uint32_t                max;

        chunk_pool_t            *unit;
} map_queue_t;


typedef struct lsv_chunk_info {

        struct list_head        discard_list;

        lsv_lock_t              lock;
        uint32_t                loaded;

        map_queue_t             avail_queue;
        map_queue_t             zero_queue;     //do it later.
} lsv_chunk_info_t;

static discard_item_t *discard_item_create(void)
{
        int ret;
        chunk_pool_t *unit = NULL;
        discard_item_t *item = NULL;

        ret = ymalloc((void **)&unit, LSV_CHUNK_SIZE);
        if (unlikely(ret)) {
                DINFO("volgc alloc memory fail, %d\n", ret);
                return NULL;
        }

        memset(unit, 0x00, LSV_CHUNK_SIZE);

        ret = ymalloc((void **)&item, sizeof(discard_item_t));
        if (unlikely(ret)) {
                DINFO("volgc alloc memory fail, %d\n", ret);
                yfree((void **)&unit);
                return NULL;
        }

        item->unit = unit;
        return item;
}

static void discard_item_destroy(discard_item_t *item)
{
        DINFO("volgc discard item destroy.\n");

        if (likely(item)) {
                yfree((void **)&item->unit);
                yfree((void **)&item);
        }

        return ;
}

static discard_item_t *discard_get_free_item(lsv_chunk_info_t *info)
{
        struct list_head *pos;
        discard_item_t *item = NULL;

        list_for_each(pos, &info->discard_list) {
                item = (discard_item_t *)pos;

                if(item->unit->len == MAX_CHUNK_MAP_SIZE) {
                        item = NULL;
                        continue;
                } else
                        break;
        }

        return item;
}

static inline chunk_pool_t * lsv_chunk_pool_proto_new()
{
        chunk_pool_t *pool = xmalloc(LSV_CHUNK_SIZE);
        assert(pool);

        memset(pool, 0, LSV_CHUNK_SIZE);

        pool->alloc_chkid = LSV_CHUNK_POOL_RESERVED_CHUNKS;

        return pool;
}

//0-0 for full update.
static inline int lsv_chunk_pool_proto_save(lsv_volume_proto_t *lsv_info,chunk_pool_t *chunk_pool, int level)      //todo. optimize it.
{
        int ret;
        lsv_volume_io_t vio;

        LSV_DBUG("save to chunk pool,chunk_id=%d\r\n", chunk_pool->self_chkid);

        lsv_volume_io_init(&vio, chunk_pool->self_chkid,
                                0, level == LSV_CHUNK_POOL_UPDATE_META?LSV_PAGE_SIZE:LSV_CHUNK_SIZE, LSV_VOLGC_STORAGE_TYPE);

        ret = lsv_volume_chunk_update(lsv_info, &vio, (lsv_s8_t *)chunk_pool);
        if (ret) {
                DERROR("volgc chunk update fail, %d\n", ret);
                return ret;
        }

        return 0;
}

static inline int lsv_chunk_pool_proto_load(lsv_volume_proto_t *lsv_info, chunk_pool_t *chunk_pool, lsv_u32_t chunk_id)
{
        int ret;

        assert(chunk_id > 0);

        DINFO("volgc load chunk id %u\n", chunk_id);

        ret = lsv_volume_chunk_read(lsv_info, chunk_id, (lsv_s8_t *)chunk_pool);
        if (ret) {
                DERROR("volgc load free chunk queue fail, %d\n", ret);
                return ret;
        }

        assert(chunk_id == chunk_pool->self_chkid);

        return 0;
}

//for free, mostly batch, need update atleast two places, so update full chunk.
static inline void lsv_chunk_pool_proto_push(chunk_pool_t *unit, lsv_u32_t *chunk_id, lsv_u32_t count)
{
        lsv_u32_t i, pos;

        for (i = 0; i < count; i++) {
                pos = (unit->pos + unit->len++ /*+ i*/) % MAX_CHUNK_MAP_SIZE;
                unit->array[pos] = chunk_id[i];

                assert(chunk_id[i]);
                LSV_DBUG("chunk push to aval: %d\r\n", chunk_id[i]);
        }
}

//only update 4k head.
static inline void lsv_chunk_pool_proto_pop(chunk_pool_t *chunk_pool, lsv_u32_t *chunk_id, lsv_u32_t count)
{
        lsv_u32_t i, pos;

        for (i = 0; i < count; i++) {
                pos = chunk_pool->pos % MAX_CHUNK_MAP_SIZE;
                chunk_id[i] = chunk_pool->array[pos];
                chunk_pool->pos = (chunk_pool->pos + 1) % MAX_CHUNK_MAP_SIZE;
                assert(chunk_pool->len--);

                LSV_DBUG("chunk pop from aval: %d\r\n", chunk_id[i]);
        }
}

static inline void lsv_chunk_pool_proto_rollback(chunk_pool_t *chunk_pool, lsv_u32_t count, int operate)
{
        if (operate == PUSH_ROLLBACK) {
                chunk_pool->len -= count;
        } else if (operate == POP_ROLLBACK) {
                chunk_pool->len += count;
                chunk_pool->pos = (chunk_pool->pos > count) ? (chunk_pool->pos - count)
                                : (chunk_pool->pos + MAX_CHUNK_MAP_SIZE - count);
        } else {
                DWARN("Do nothing!!\n");
        }
}

static int lsv_volume_chunk_allocate(lsv_volume_proto_t *lsv_info, lsv_u32_t *chunk_id, int chknum, int fill)
{
        int ret, i;
        chkid_t *chkid;
        volume_proto_t *volume_proto = lsv_info->volume_proto;

        YASSERT(volume_proto);

        ret = ymalloc((void **)&chkid, sizeof(chkid_t) * chknum);
        if (ret) {
                DERROR("volgc alloc memory fail, %d\n", ret);
                goto err_ret;
        }

        for (i = 0; i < chknum; i++) {
                chkid[i].id = volume_proto->chkid.id;
                chkid[i].type = __RAW_CHUNK__;
                chkid[i].idx = chunk_id[i];

                LSV_DBUG("chunk allocated: %d\r\n", chkid[i].idx);
        }

        LSV_DBUG("volgc chunk allocate %d\n", chknum);
        ret = volume_proto->chunk_allocate(volume_proto, chkid, chknum, fill);
        if (ret) {
                DERROR("volgc chunk allocate fail, %d\n", ret);
                goto err_free;
        }

        yfree((void **)&chkid);

        return 0;
err_free:
        yfree((void **)&chkid);
err_ret:
        return ret;
}

/**
 * if success,
 *      return malloc count
 * else
 *      return -errno
 */
static inline int lsv_chunk_pool_alloc_from_avail_queue(lsv_volume_proto_t *lsv_info, lsv_u32_t count, lsv_u32_t *chunk_id)
{
        int ret;
        chunk_pool_t *chunk_pool = NULL;
        lsv_chunk_info_t *info = lsv_info->volgc_info;

        assert(info->loaded);

        chunk_pool = info->avail_queue.unit;
        count = count > chunk_pool->len ? chunk_pool->len : count;

        lsv_chunk_pool_proto_pop(chunk_pool, chunk_id, count);

        if (count > 0) {
                ret = lsv_chunk_pool_proto_save(lsv_info, chunk_pool, LSV_CHUNK_POOL_UPDATE_META);
                if (ret) {
                        lsv_chunk_pool_proto_rollback(chunk_pool, count, POP_ROLLBACK);
                        DERROR("volgc chunk update fail, %d\n", ret);
                        return -ret;
                }
        }

        return count;
}

/**
 * 需不需要添加到free queue，并持久化 ????
 */
static int lsv_volume_malloc_from_discard_list(lsv_volume_proto_t *lsv_info, lsv_u32_t *chunk_id, lsv_u32_t count)
{
        lsv_u32_t tmp = 0, pos = 0, num = 0, i;
        discard_item_t *item = NULL;
        struct list_head *hook = NULL;
        lsv_chunk_info_t *info = lsv_info->volgc_info;

        tmp = count;

        list_for_each(hook, &info->discard_list) {
                item = (discard_item_t *)hook;
                tmp = tmp > item->unit->len ? item->unit->len : tmp;
                for (i = 0; i < tmp; i++) {
                        pos = item->unit->pos % MAX_CHUNK_MAP_SIZE;
                        chunk_id[num++] = item->unit->array[pos];
                        item->unit->pos = (item->unit->pos + 1) % MAX_CHUNK_MAP_SIZE;
                        item->unit->len--;
                }

                count = count - tmp;
                if (count <= 0)
                        break;

                tmp = count;
        }

        return num;
}

static int lsv_volume_extend_avail_queue(lsv_volume_proto_t *lsv_info)
{
        int ret;
        lsv_u32_t count = 0, i;
        chunk_pool_t *chunk_pool = NULL;
        lsv_u32_t *chunk_id = NULL;
        lsv_chunk_info_t *info = lsv_info->volgc_info;

        chunk_pool = info->avail_queue.unit;
        if (chunk_pool->len + LSV_CHUNK_ALLOCATE_STEP > info->avail_queue.max)
                count = info->avail_queue.max - chunk_pool->len;
        else
                count = LSV_CHUNK_ALLOCATE_STEP;

        LSV_DBUG("volgc extend chunk count %d\n", count);
        ret = ymalloc((void **)&chunk_id, sizeof(lsv_u32_t) * count);
        if (ret) {
                DERROR("alloc memory fail, %d\n", ret);
                return ret;
        }

#if !SKIP_DISCARD
        ret = lsv_volume_malloc_from_discard_list(lsv_info, chunk_id, LSV_CHUNK_ALLOCATE_STEP);
        if (ret < 0) {
                DERROR("malloc from discard list fail, %d\n", -ret);
                yfree((void **)&chunk_id);
                return -ret;
        }
#endif

        //assert(!ret);//do it later.
        if (ret >= LSV_CHUNK_ALLOCATE_STEP) {
                lsv_chunk_pool_proto_push(chunk_pool, chunk_id, LSV_CHUNK_ALLOCATE_STEP);

                ret = lsv_chunk_pool_proto_save(lsv_info, chunk_pool, LSV_CHUNK_POOL_UPDATE_ALL);
                if (ret) {
                        lsv_chunk_pool_proto_rollback(chunk_pool, LSV_CHUNK_ALLOCATE_STEP, PUSH_ROLLBACK);
                        yfree((void **)&chunk_id);
                        return ret;
                }

        } else {

                count = count - ret;
                for (i = 0; i < count; i++)
                        chunk_id[ret + i] = ++(chunk_pool->alloc_chkid);

                ret = lsv_volume_chunk_allocate(lsv_info, chunk_id + ret, count, 1);
                if (ret) {
                        DERROR("volgc chunk allocate fail, %d\n", ret);

                        yfree((void **)&chunk_id);
                        return -ret;
                }

                lsv_chunk_pool_proto_push(chunk_pool, chunk_id, LSV_CHUNK_ALLOCATE_STEP);

                ret = lsv_chunk_pool_proto_save(lsv_info, chunk_pool, LSV_CHUNK_POOL_UPDATE_ALL);
                if (ret) {
                        lsv_chunk_pool_proto_rollback(chunk_pool, LSV_CHUNK_ALLOCATE_STEP, PUSH_ROLLBACK);

                        yfree((void **)&chunk_id);
                        return ret;
                }
        }

        return 0;
}

static int lsv_volume_load_chunk_queue(lsv_volume_proto_t *lsv_info)
{
        int ret;
        lsv_chunk_info_t *info = lsv_info->volgc_info;
        chunk_pool_t *chunk_pool = info->avail_queue.unit;

        DINFO("load the default chunk pool, chunk_id %u\n", LSV_CHUNK_DEFAULT_POOL_ID);

        ret = lsv_chunk_pool_proto_load(lsv_info, chunk_pool, LSV_CHUNK_DEFAULT_POOL_ID);
        if (ret) {
                DERROR("volgc load free chunk queue fail, %d\n", ret);
                return ret;
        }

        DINFO("load the sub chunk pool, chunk_id %u\n", chunk_pool->next_chkid);
        while(chunk_pool->next_chkid) {
                discard_item_t *item = discard_item_create();
                if(!item)
                        return -ENOMEM;
                //item.unit = lsv_chunk_pool_proto_new();
                ret = lsv_chunk_pool_proto_load(lsv_info, chunk_pool, chunk_pool->next_chkid);
                if (ret) {
                        DERROR("volgc load free chunk queue fail, %d\n", ret);
                        return ret;
                }

                list_add_tail(&item->hook, &((lsv_chunk_info_t *)lsv_info->volgc_info)->discard_list);

                chunk_pool = item->unit;
        }

        DINFO("load finished.\r\n");

        return 0;
}

//create new volume
int lsv_chunk_pool_init(lsv_volume_proto_t *lsv_info, int create_new)
{
        int ret;
        lsv_chunk_info_t *info = NULL;

        DINFO("volgc "CHKID_FORMAT" create\n",
                CHKID_ARG(&lsv_info->volume_proto->chkid));

        /// 1.create memory struct
        ret = ymalloc((void **)&info, sizeof(lsv_chunk_info_t));
        if (ret) {
                DERROR("alloc memory fail, %d\n", ret);
                goto err_ret;
        }

        lsv_lock_init(&info->lock);
        INIT_LIST_HEAD(&info->discard_list);

        info->avail_queue.min = MIN_CHUNK_MAP_SIZE;
        info->avail_queue.max = MAX_CHUNK_MAP_SIZE;
        info->avail_queue.unit = lsv_chunk_pool_proto_new();

        lsv_info->volgc_info = (void *)info;

        if(create_new) {
                info->avail_queue.unit->self_chkid = LSV_CHUNK_DEFAULT_POOL_ID;
                ret = lsv_chunk_pool_proto_save(lsv_info, info->avail_queue.unit, LSV_CHUNK_POOL_UPDATE_ALL); //must save to init it even create new.
                if (ret) {
                        DERROR("save chunk pool failed, %d\n", ret);
                        goto err_free;
                }
        }
        else {
                ret = lsv_volume_load_chunk_queue(lsv_info);
                if (unlikely(ret)) {
                        DERROR("load chunk pool failed, %d\n", ret);
                        goto err_free;
                }
        }

        DINFO("chunk pool initialized, mode=%d\r\n", create_new);

        info->loaded = 1;

        return 0;
err_free:
        yfree((void **)&info);
        lsv_info->volgc_info = NULL;
err_ret:
        return ret;
}

int lsv_chunk_pool_destory(lsv_volume_proto_t *lsv_info)
{
        lsv_chunk_info_t *info = lsv_info->volgc_info;
        discard_item_t *item = NULL;
        struct list_head *hook, *n;

        if (lsv_info->volume_proto)
                DINFO("volgc "CHKID_FORMAT" delete ...\n",
                                CHKID_ARG(&lsv_info->volume_proto->chkid));

        if (info) {
                lsv_lock_destroy(&info->lock);
                xfree(info->avail_queue.unit);

                list_for_each_safe(hook, n, &info->discard_list) {
                        item = (discard_item_t *)hook;
                        list_del(&item->hook);
                        xfree((void *)item->unit);
                }

                yfree((void **)&info);
        }

        lsv_info->volgc_info = NULL;
        return 0;
}

/**
 * 无锁,上层调用需要加锁
 * lsv_chunk_info_t *info --> info->lock
 */
int lsv_chunk_pool_malloc_nolock(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t *chunk_id)
{
        return lsv_chunk_pool_malloc_batch_nolock(lsv_info, type, 1, chunk_id);
}

int lsv_chunk_pool_malloc(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t *chunk_id)
{
        return lsv_chunk_pool_malloc_batch(lsv_info, type, 1, chunk_id);
}

int lsv_chunk_pool_free(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t chunk_id)
{
        return lsv_chunk_pool_free_batch(lsv_info, type, 1, &chunk_id);
}

int lsv_chunk_pool_malloc_batch_nolock(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t count, lsv_u32_t *chunk_id)
{
        int ret;

        DINFO("volgc "CHKID_FORMAT" malloc %u ...\n",
                CHKID_ARG(&lsv_info->volume_proto->chkid), count);


        ret = lsv_chunk_pool_alloc_from_avail_queue(lsv_info, count, chunk_id);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        } else {
                if (ret != count) {
                        lsv_volume_extend_avail_queue(lsv_info);

                        ret = lsv_chunk_pool_alloc_from_avail_queue(lsv_info, count - ret, chunk_id + ret);
                        if (ret < 0) {
                                ret = -ret;
                                GOTO(err_ret, ret);
                        }
                }
        }


        lsv_u32_t i;
        for (i = 0; i < count; i++) {
                LSV_DBUG("volgc "CHKID_FORMAT" malloc, chk_id: %u ...\n",
                        CHKID_ARG(&lsv_info->volume_proto->chkid), chunk_id[i]);

                        assert(chunk_id[i]);
        }

#if 0
        unit = info->avail_queue.unit;
        if (unit.len < info->avail_queue.min)
                schedule_task_new();
#endif

        return 0;
err_ret:
        return ret;
}

#if 0
int lsv_chunk_pool_malloc_batch(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t count, lsv_u32_t *chunk_id)
{
        int ret;
        lsv_chunk_info_t *info = lsv_info->volgc_info;

        LSV_DBUG("volgc "CHKID_FORMAT" malloc %u ...\n",
                CHKID_ARG(&lsv_info->volume_proto->chkid), count);

        ret = lsv_lock(&info->lock);
        if (ret)
                goto err_ret;

        ret = lsv_chunk_pool_alloc_from_avail_queue(lsv_info, count, chunk_id);
        if (ret < 0) {
                lsv_unlock(&info->lock);
                ret = -ret;
                GOTO(err_ret, ret);
        } else {
                if (ret != count) {
                        lsv_volume_extend_avail_queue(lsv_info);

                        ret = lsv_chunk_pool_alloc_from_avail_queue(lsv_info, count - ret, chunk_id + ret);
                        if (ret < 0) {
                                lsv_unlock(&info->lock);
                                ret = -ret;
                                GOTO(err_ret, ret);
                        }
                }
        }

        lsv_u32_t i;
        for (i = 0; i < count; i++) {
                LSV_DBUG("volgc "CHKID_FORMAT" malloc, chk_id: %u ...\n",
                        CHKID_ARG(&lsv_info->volume_proto->chkid), chunk_id[i]);

                        assert(chunk_id[i]);
        }

        lsv_unlock(&info->lock);

#if 0
        unit = info->avail_queue.unit;
        if (unit.len < info->avail_queue.min)
                schedule_task_new();
#endif

        return 0;
//err_unlock:
//        lsv_unlock(&info->lock);
err_ret:
        return ret;
}

#else

int lsv_chunk_pool_malloc_batch(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t count, lsv_u32_t *chunk_id)
{
        int ret;
        lsv_chunk_info_t *info = lsv_info->volgc_info;

        DINFO("volgc "CHKID_FORMAT" malloc %u ...\n",
                        CHKID_ARG(&lsv_info->volume_proto->chkid), count);

        memset(chunk_id, 0x0, sizeof(lsv_u32_t) * count);

        ret = lsv_lock(&info->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        int left = count, retry = 0, alct = 0;
        while (left) {

                alct = lsv_chunk_pool_alloc_from_avail_queue(lsv_info, left, chunk_id + count - left);
                if (unlikely(alct < 0)) {
                        ret = -alct;
                        GOTO(err_unlock, ret);
                } else if (alct != left) {
                        DWARN("need %d left %d get %d, retry %d\n", count, left, alct, retry);
                        if (unlikely(retry >= 3)) {
                                ret = EAGAIN;
                                GOTO(err_unlock, ret);
                        }
                        retry ++;
                        lsv_volume_extend_avail_queue(lsv_info);
                } else {
                        YASSERT(alct == left);
                }
                left -= alct;
        }

        lsv_u32_t i;
        for (i = 0; i < count; i++) {
                DINFO("volgc "CHKID_FORMAT" malloc, chk_id: %u ...\n",
                                CHKID_ARG(&lsv_info->volume_proto->chkid), chunk_id[i]);

                assert(chunk_id[i]);
        }

        lsv_unlock(&info->lock);

        return 0;
err_unlock:
        if (likely(left < count)) {
                lsv_chunk_pool_free_batch(lsv_info, type, count - left, chunk_id);
                memset(chunk_id, 0x0, sizeof(lsv_u32_t) * count);
        }
        lsv_unlock(&info->lock);
err_ret:
        return ret;
}
#endif

/**
  * 需要锁保证排他性
  * 1.找到空闲的discard item
  * 2.将chunk还给discard list
  * 3.持久化discard list
*/
int lsv_chunk_pool_free_batch(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t count, lsv_u32_t *chunk_id)
{
        int ret;
        lsv_u32_t self_chkid, tcount;
        discard_item_t *item = NULL;
        lsv_chunk_info_t *info = lsv_info->volgc_info;

        (void) type;

        if (count <= 0)
                return 0;

        LSV_DBUG("lsv_chunk_pool_free_batch, count=%d ...\n", count);

        ret = lsv_lock(&info->lock);
        if (unlikely(ret)) {
                DINFO("volgc lock fail, %d\n", ret);
                goto err_ret;
        }

again:
        if(list_empty(&info->discard_list) || !(item = discard_get_free_item(info)) ) {

                item = discard_item_create();
                if (item == NULL)
                        goto err_unlock;

                ret = lsv_chunk_pool_malloc_nolock(lsv_info, LSV_VOLGC_STORAGE_TYPE, &self_chkid);
                if (unlikely(ret)) {
                        DINFO("volgc alloc chunk fail, %d\n", ret);
                        discard_item_destroy(item);
                        goto err_unlock;
                }

                item->unit->self_chkid = self_chkid;
                tcount = MAX_CHUNK_MAP_SIZE - item->unit->len;
                tcount = tcount > count ? count : tcount;

                lsv_chunk_pool_proto_push(item->unit, chunk_id, tcount);

                //持久化 兄弟chunk
                info->avail_queue.unit->next_chkid = self_chkid;
                ret = lsv_chunk_pool_proto_save(lsv_info, info->avail_queue.unit, LSV_CHUNK_POOL_UPDATE_META);
                if (ret) {
                        discard_item_destroy(item);
                        goto err_unlock;
                }

                //持久化 self chunk
                ret = lsv_chunk_pool_proto_save(lsv_info, item->unit, LSV_CHUNK_POOL_UPDATE_ALL);
                if (ret) {
                        discard_item_destroy(item);
                        goto err_unlock;
                }

                list_add_tail(&item->hook, &info->discard_list);

        } else {
                tcount = MAX_CHUNK_MAP_SIZE - item->unit->len;
                tcount = tcount > count ? count : tcount;

                lsv_chunk_pool_proto_push(item->unit, chunk_id, tcount);

                //持久化 self chunk
                ret = lsv_chunk_pool_proto_save(lsv_info, item->unit, LSV_CHUNK_POOL_UPDATE_ALL);
                if (ret) {
                        lsv_chunk_pool_proto_rollback(item->unit, tcount, PUSH_ROLLBACK);
                        goto err_unlock;
                }
        }

        if (tcount < count) {
                count -= tcount;
                chunk_id += tcount;

                goto again;
        }

        lsv_unlock(&info->lock);

        return 0;
err_unlock:
        lsv_unlock(&info->lock);
err_ret:
        return ret;
}
